Skip to content
GitLab
Projects
Groups
Topics
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
hpc
gekkofs
Compare revisions
a3c90ffba27fa813fdf091599fb3235f7b93b41c to 957f7a63be953576cac9193ce77db075204c35c2
Hide whitespace changes
Inline
Side-by-side
src/client/preload_context.cpp
View file @
957f7a63
...
...
@@ -65,6 +65,8 @@ PreloadContext::PreloadContext()
char
host
[
255
];
gethostname
(
host
,
255
);
hostname
=
host
;
PreloadContext
::
set_replicas
(
std
::
stoi
(
gkfs
::
env
::
get_var
(
gkfs
::
env
::
NUM_REPL
,
"0"
)));
}
void
...
...
@@ -452,5 +454,15 @@ PreloadContext::get_hostname() {
return
hostname
;
}
void
PreloadContext
::
set_replicas
(
const
int
repl
)
{
replicas_
=
repl
;
}
int
PreloadContext
::
get_replicas
()
{
return
replicas_
;
}
}
// namespace preload
}
// namespace gkfs
src/client/preload_util.cpp
View file @
957f7a63
...
...
@@ -200,16 +200,26 @@ namespace gkfs::utils {
optional
<
gkfs
::
metadata
::
Metadata
>
get_metadata
(
const
string
&
path
,
bool
follow_links
)
{
std
::
string
attr
;
auto
err
=
gkfs
::
rpc
::
forward_stat
(
path
,
attr
);
auto
err
=
gkfs
::
rpc
::
forward_stat
(
path
,
attr
,
0
);
// TODO: retry on failure
if
(
err
)
{
errno
=
err
;
return
{};
auto
copy
=
1
;
while
(
copy
<
CTX
->
get_replicas
()
+
1
&&
err
)
{
LOG
(
ERROR
,
"Retrying Stat on replica {} {}"
,
copy
,
follow_links
);
err
=
gkfs
::
rpc
::
forward_stat
(
path
,
attr
,
copy
);
copy
++
;
}
if
(
err
)
{
errno
=
err
;
return
{};
}
}
#ifdef HAS_SYMLINKS
if
(
follow_links
)
{
gkfs
::
metadata
::
Metadata
md
{
attr
};
while
(
md
.
is_link
())
{
err
=
gkfs
::
rpc
::
forward_stat
(
md
.
target_path
(),
attr
);
err
=
gkfs
::
rpc
::
forward_stat
(
md
.
target_path
(),
attr
,
0
);
if
(
err
)
{
errno
=
err
;
return
{};
...
...
src/client/rpc/forward_data.cpp
View file @
957f7a63
...
...
@@ -34,6 +34,7 @@
#include
<common/rpc/distributor.hpp>
#include
<common/arithmetic/arithmetic.hpp>
#include
<common/rpc/rpc_util.hpp>
#include
<unordered_set>
...
...
@@ -42,21 +43,26 @@ using namespace std;
namespace
gkfs
::
rpc
{
/*
* This file includes all
meta
data RPC calls.
* This file includes all data RPC calls.
* NOTE: No errno is defined here!
*/
/**
* Send an RPC request to write from a buffer.
* There is a bitset of 1024 chunks to tell the server
* which chunks to process. Exceeding this value will work without
* replication. Another way is to leverage mercury segments.
* TODO: Decide how to manage a write to a replica that doesn't exist
* @param path
* @param buf
* @param append_flag
* @param write_size
* @param num_copies number of replicas
* @return pair<error code, written size>
*/
pair
<
int
,
ssize_t
>
forward_write
(
const
string
&
path
,
const
void
*
buf
,
const
off64_t
offset
,
const
size_t
write_size
)
{
const
size_t
write_size
,
const
int8_t
num_copies
)
{
// import pow2-optimized arithmetic functions
using
namespace
gkfs
::
utils
::
arithmetic
;
...
...
@@ -69,35 +75,50 @@ forward_write(const string& path, const void* buf, const off64_t offset,
auto
chnk_end
=
block_index
((
offset
+
write_size
)
-
1
,
gkfs
::
config
::
rpc
::
chunksize
);
auto
chnk_total
=
(
chnk_end
-
chnk_start
)
+
1
;
// Collect all chunk ids within count that have the same destination so
// that those are send in one rpc bulk transfer
std
::
map
<
uint64_t
,
std
::
vector
<
uint64_t
>>
target_chnks
{};
// contains the target ids, used to access the target_chnks map.
// First idx is chunk with potential offset
std
::
vector
<
uint64_t
>
targets
{};
// targets for the first and last chunk as they need special treatment
uint64_t
chnk_start_target
=
0
;
uint64_t
chnk_end_target
=
0
;
// We need a set to manage replicas.
std
::
set
<
uint64_t
>
chnk_start_target
{};
std
::
set
<
uint64_t
>
chnk_end_target
{};
for
(
uint64_t
chnk_id
=
chnk_start
;
chnk_id
<=
chnk_end
;
chnk_id
++
)
{
auto
target
=
CTX
->
distributor
()
->
locate_data
(
path
,
chnk_id
);
std
::
unordered_map
<
uint64_t
,
std
::
vector
<
uint8_t
>>
write_ops_vect
;
if
(
target_chnks
.
count
(
target
)
==
0
)
{
target_chnks
.
insert
(
std
::
make_pair
(
target
,
std
::
vector
<
uint64_t
>
{
chnk_id
}));
targets
.
push_back
(
target
);
}
else
{
target_chnks
[
target
].
push_back
(
chnk_id
);
}
// If num_copies is 0, we do the normal write operation. Otherwise
// we process all the replicas.
for
(
uint64_t
chnk_id
=
chnk_start
;
chnk_id
<=
chnk_end
;
chnk_id
++
)
{
for
(
auto
copy
=
num_copies
?
1
:
0
;
copy
<
num_copies
+
1
;
copy
++
)
{
auto
target
=
CTX
->
distributor
()
->
locate_data
(
path
,
chnk_id
,
copy
);
if
(
write_ops_vect
.
find
(
target
)
==
write_ops_vect
.
end
())
write_ops_vect
[
target
]
=
std
::
vector
<
uint8_t
>
(((
chnk_total
+
7
)
/
8
));
gkfs
::
rpc
::
set_bitset
(
write_ops_vect
[
target
],
chnk_id
-
chnk_start
);
if
(
target_chnks
.
count
(
target
)
==
0
)
{
target_chnks
.
insert
(
std
::
make_pair
(
target
,
std
::
vector
<
uint64_t
>
{
chnk_id
}));
targets
.
push_back
(
target
);
}
else
{
target_chnks
[
target
].
push_back
(
chnk_id
);
}
// set first and last chnk targets
if
(
chnk_id
==
chnk_start
)
{
chnk_start_target
=
target
;
}
// set first and last chnk targets
if
(
chnk_id
==
chnk_start
)
{
chnk_start_target
.
insert
(
target
)
;
}
if
(
chnk_id
==
chnk_end
)
{
chnk_end_target
=
target
;
if
(
chnk_id
==
chnk_end
)
{
chnk_end_target
.
insert
(
target
);
}
}
}
...
...
@@ -133,13 +154,13 @@ forward_write(const string& path, const void* buf, const off64_t offset,
target_chnks
[
target
].
size
()
*
gkfs
::
config
::
rpc
::
chunksize
;
// receiver of first chunk must subtract the offset from first chunk
if
(
target
=
=
chnk_start_target
)
{
if
(
chnk_start_target
.
end
()
!
=
chnk_start_target
.
find
(
target
)
)
{
total_chunk_size
-=
block_overrun
(
offset
,
gkfs
::
config
::
rpc
::
chunksize
);
}
// receiver of last chunk must subtract
if
(
target
=
=
chnk_end_target
&&
if
(
chnk_end_target
.
end
()
!
=
chnk_end_target
.
find
(
target
)
&&
!
is_aligned
(
offset
+
write_size
,
gkfs
::
config
::
rpc
::
chunksize
))
{
total_chunk_size
-=
block_underrun
(
offset
+
write_size
,
gkfs
::
config
::
rpc
::
chunksize
);
...
...
@@ -148,7 +169,6 @@ forward_write(const string& path, const void* buf, const off64_t offset,
auto
endp
=
CTX
->
hosts
().
at
(
target
);
try
{
LOG
(
DEBUG
,
"Sending RPC ..."
);
gkfs
::
rpc
::
write_data
::
input
in
(
...
...
@@ -158,6 +178,7 @@ forward_write(const string& path, const void* buf, const off64_t offset,
block_overrun
(
offset
,
gkfs
::
config
::
rpc
::
chunksize
),
target
,
CTX
->
hosts
().
size
(),
// number of chunks handled by that destination
gkfs
::
rpc
::
compress_bitset
(
write_ops_vect
[
target
]),
target_chnks
[
target
].
size
(),
// chunk start id of this write
chnk_start
,
...
...
@@ -175,25 +196,26 @@ forward_write(const string& path, const void* buf, const off64_t offset,
ld_network_service
->
post
<
gkfs
::
rpc
::
write_data
>
(
endp
,
in
));
LOG
(
DEBUG
,
"host: {}, path:
\"
{}
\"
, chunks: {}, size: {}, offset: {}"
,
target
,
path
,
in
.
ch
u
nk_
n
(),
total_
ch
u
nk_
size
,
in
.
offset
());
"host: {}, path:
\"
{}
\"
,
chunk_start: {}, chunk_end: {},
chunks: {}, size: {}, offset: {}"
,
target
,
path
,
chnk_
start
,
chnk_
end
,
in
.
chunk_n
(),
total_chunk_size
,
in
.
offset
());
}
catch
(
const
std
::
exception
&
ex
)
{
LOG
(
ERROR
,
"Unable to send non-blocking rpc for "
"path
\"
{}
\"
[peer: {}]"
,
path
,
target
);
return
make_pair
(
EBUSY
,
0
);
if
(
num_copies
==
0
)
return
make_pair
(
EBUSY
,
0
);
}
}
// Wait for RPC responses and then get response and add it to out_size
// which is the written size All potential outputs are served to free
// resources regardless of errors, although an errorcode is set.
auto
err
=
0
;
ssize_t
out_size
=
0
;
std
::
size_t
idx
=
0
;
#ifdef REPLICA_CHECK
std
::
vector
<
uint8_t
>
fill
(
chnk_total
);
auto
write_ops
=
write_ops_vect
.
begin
();
#endif
for
(
const
auto
&
h
:
handles
)
{
try
{
// XXX We might need a timeout here to not wait forever for an
...
...
@@ -203,18 +225,52 @@ forward_write(const string& path, const void* buf, const off64_t offset,
if
(
out
.
err
()
!=
0
)
{
LOG
(
ERROR
,
"Daemon reported error: {}"
,
out
.
err
());
err
=
out
.
err
();
}
else
{
out_size
+=
static_cast
<
size_t
>
(
out
.
io_size
());
#ifdef REPLICA_CHECK
if
(
num_copies
)
{
if
(
fill
.
size
()
==
0
)
{
fill
=
write_ops
->
second
;
}
else
{
for
(
size_t
i
=
0
;
i
<
fill
.
size
();
i
++
)
{
fill
[
i
]
|=
write_ops
->
second
[
i
];
}
}
}
write_ops
++
;
#endif
}
out_size
+=
static_cast
<
size_t
>
(
out
.
io_size
());
}
catch
(
const
std
::
exception
&
ex
)
{
LOG
(
ERROR
,
"Failed to get rpc output for path
\"
{}
\"
[peer: {}]"
,
path
,
targets
[
idx
]);
err
=
EIO
;
}
idx
++
;
}
// As servers can fail (and we cannot know if the total data is written), we
// send the updated size but check that at least one copy of all chunks are
// processed.
if
(
num_copies
)
{
// A bit-wise or should show that all the chunks are written (255)
out_size
=
write_size
;
#ifdef REPLICA_CHECK
for
(
size_t
i
=
0
;
i
<
fill
.
size
()
-
1
;
i
++
)
{
if
(
fill
[
i
]
!=
255
)
{
err
=
EIO
;
break
;
}
}
// Process the leftover bytes
for
(
uint64_t
chnk_id
=
(
chnk_start
+
(
fill
.
size
()
-
1
)
*
8
);
chnk_id
<=
chnk_end
;
chnk_id
++
)
{
if
(
!
(
fill
[(
chnk_id
-
chnk_start
)
/
8
]
&
(
1
<<
((
chnk_id
-
chnk_start
)
%
8
))))
{
err
=
EIO
;
break
;
}
}
#endif
}
/*
* Typically file systems return the size even if only a part of it was
* written. In our case, we do not keep track which daemon fully wrote its
...
...
@@ -232,11 +288,14 @@ forward_write(const string& path, const void* buf, const off64_t offset,
* @param buf
* @param offset
* @param read_size
* @param num_copies number of copies available (0 is no replication)
* @param failed nodes failed that should not be used
* @return pair<error code, read size>
*/
pair
<
int
,
ssize_t
>
forward_read
(
const
string
&
path
,
void
*
buf
,
const
off64_t
offset
,
const
size_t
read_size
)
{
const
size_t
read_size
,
const
int8_t
num_copies
,
std
::
set
<
int8_t
>&
failed
)
{
// import pow2-optimized arithmetic functions
using
namespace
gkfs
::
utils
::
arithmetic
;
...
...
@@ -246,19 +305,35 @@ forward_read(const string& path, void* buf, const off64_t offset,
auto
chnk_start
=
block_index
(
offset
,
gkfs
::
config
::
rpc
::
chunksize
);
auto
chnk_end
=
block_index
((
offset
+
read_size
-
1
),
gkfs
::
config
::
rpc
::
chunksize
);
auto
chnk_total
=
(
chnk_end
-
chnk_start
)
+
1
;
// Collect all chunk ids within count that have the same destination so
// that those are send in one rpc bulk transfer
std
::
map
<
uint64_t
,
std
::
vector
<
uint64_t
>>
target_chnks
{};
// contains the recipient ids, used to access the target_chnks map.
// First idx is chunk with potential offset
std
::
vector
<
uint64_t
>
targets
{};
// targets for the first and last chunk as they need special treatment
uint64_t
chnk_start_target
=
0
;
uint64_t
chnk_end_target
=
0
;
std
::
unordered_map
<
uint64_t
,
std
::
vector
<
uint8_t
>>
read_bitset_vect
;
for
(
uint64_t
chnk_id
=
chnk_start
;
chnk_id
<=
chnk_end
;
chnk_id
++
)
{
auto
target
=
CTX
->
distributor
()
->
locate_data
(
path
,
chnk_id
);
auto
target
=
CTX
->
distributor
()
->
locate_data
(
path
,
chnk_id
,
0
);
if
(
num_copies
>
0
)
{
// If we have some failures we select another copy (randomly).
while
(
failed
.
find
(
target
)
!=
failed
.
end
())
{
LOG
(
DEBUG
,
"Selecting another node, target: {} down"
,
target
);
target
=
CTX
->
distributor
()
->
locate_data
(
path
,
chnk_id
,
rand
()
%
num_copies
);
}
}
if
(
read_bitset_vect
.
find
(
target
)
==
read_bitset_vect
.
end
())
read_bitset_vect
[
target
]
=
std
::
vector
<
uint8_t
>
(((
chnk_total
+
7
)
/
8
));
read_bitset_vect
[
target
][(
chnk_id
-
chnk_start
)
/
8
]
|=
1
<<
((
chnk_id
-
chnk_start
)
%
8
);
// set
if
(
target_chnks
.
count
(
target
)
==
0
)
{
target_chnks
.
insert
(
...
...
@@ -303,6 +378,7 @@ forward_read(const string& path, void* buf, const off64_t offset,
// TODO(amiranda): This could be simplified by adding a vector of inputs
// to async_engine::broadcast(). This would allow us to avoid manually
// looping over handles as we do below
for
(
const
auto
&
target
:
targets
)
{
// total chunk_size for target
...
...
@@ -334,6 +410,7 @@ forward_read(const string& path, void* buf, const off64_t offset,
// a potential offset
block_overrun
(
offset
,
gkfs
::
config
::
rpc
::
chunksize
),
target
,
CTX
->
hosts
().
size
(),
gkfs
::
rpc
::
compress_bitset
(
read_bitset_vect
[
target
]),
// number of chunks handled by that destination
target_chnks
[
target
].
size
(),
// chunk start id of this write
...
...
@@ -343,11 +420,12 @@ forward_read(const string& path, void* buf, const off64_t offset,
// total size to write
total_chunk_size
,
local_buffers
);
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that
// we can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so
// that we can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a
// post(endpoint) returning one result and a
// broadcast(endpoint_set) returning a result_set. When that
// happens we can remove the .at(0) :/
handles
.
emplace_back
(
ld_network_service
->
post
<
gkfs
::
rpc
::
read_data
>
(
endp
,
in
));
...
...
@@ -394,9 +472,15 @@ forward_read(const string& path, void* buf, const off64_t offset,
LOG
(
ERROR
,
"Failed to get rpc output for path
\"
{}
\"
[peer: {}]"
,
path
,
targets
[
idx
]);
err
=
EIO
;
// We should get targets[idx] and remove from the list of peers
failed
.
insert
(
targets
[
idx
]);
// Then repeat the read with another peer (We repear the full
// read, this can be optimised but it is a cornercase)
}
idx
++
;
}
/*
* Typically file systems return the size even if only a part of it was
* read. In our case, we do not keep track which daemon fully read its
...
...
@@ -413,11 +497,12 @@ forward_read(const string& path, void* buf, const off64_t offset,
* @param path
* @param current_size
* @param new_size
* @param num_copies Number of replicas
* @return error code
*/
int
forward_truncate
(
const
std
::
string
&
path
,
size_t
current_size
,
size_t
new_size
)
{
forward_truncate
(
const
std
::
string
&
path
,
size_t
current_size
,
size_t
new_size
,
const
int8_t
num_copies
)
{
// import pow2-optimized arithmetic functions
using
namespace
gkfs
::
utils
::
arithmetic
;
...
...
@@ -434,7 +519,9 @@ forward_truncate(const std::string& path, size_t current_size,
std
::
unordered_set
<
unsigned
int
>
hosts
;
for
(
unsigned
int
chunk_id
=
chunk_start
;
chunk_id
<=
chunk_end
;
++
chunk_id
)
{
hosts
.
insert
(
CTX
->
distributor
()
->
locate_data
(
path
,
chunk_id
));
for
(
auto
copy
=
0
;
copy
<
(
num_copies
+
1
);
++
copy
)
{
hosts
.
insert
(
CTX
->
distributor
()
->
locate_data
(
path
,
chunk_id
,
copy
));
}
}
std
::
vector
<
hermes
::
rpc_handle
<
gkfs
::
rpc
::
trunc_data
>>
handles
;
...
...
@@ -450,20 +537,23 @@ forward_truncate(const std::string& path, size_t current_size,
gkfs
::
rpc
::
trunc_data
::
input
in
(
path
,
new_size
);
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that
// we can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so
// that we can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a
// post(endpoint) returning one result and a
// broadcast(endpoint_set) returning a result_set. When that
// happens we can remove the .at(0) :/
handles
.
emplace_back
(
ld_network_service
->
post
<
gkfs
::
rpc
::
trunc_data
>
(
endp
,
in
));
}
catch
(
const
std
::
exception
&
ex
)
{
// TODO(amiranda): we should cancel all previously posted requests
// here, unfortunately, Hermes does not support it yet :/
// TODO(amiranda): we should cancel all previously posted
// requests here, unfortunately, Hermes does not support it yet
// :/
LOG
(
ERROR
,
"Failed to send request to host: {}"
,
host
);
err
=
EIO
;
break
;
// We need to gather all responses so we can't return here
break
;
// We need to gather all responses so we can't return
// here
}
}
...
...
@@ -503,20 +593,23 @@ forward_get_chunk_stat() {
gkfs
::
rpc
::
chunk_stat
::
input
in
(
0
);
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that
// we can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so
// that we can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a
// post(endpoint) returning one result and a
// broadcast(endpoint_set) returning a result_set. When that
// happens we can remove the .at(0) :/
handles
.
emplace_back
(
ld_network_service
->
post
<
gkfs
::
rpc
::
chunk_stat
>
(
endp
,
in
));
}
catch
(
const
std
::
exception
&
ex
)
{
// TODO(amiranda): we should cancel all previously posted requests
// here, unfortunately, Hermes does not support it yet :/
// TODO(amiranda): we should cancel all previously posted
// requests here, unfortunately, Hermes does not support it yet
// :/
LOG
(
ERROR
,
"Failed to send request to host: {}"
,
endp
.
to_string
());
err
=
EBUSY
;
break
;
// We need to gather all responses so we can't return here
break
;
// We need to gather all responses so we can't return
// here
}
}
...
...
@@ -547,9 +640,11 @@ forward_get_chunk_stat() {
chunk_free
+=
out
.
chunk_free
();
}
catch
(
const
std
::
exception
&
ex
)
{
LOG
(
ERROR
,
"Failed to get RPC output from host: {}"
,
i
);
err
=
EBUSY
;
// Avoid setting err if a server fails.
// err = EBUSY;
}
}
if
(
err
)
return
make_pair
(
err
,
ChunkStat
{});
else
...
...
src/client/rpc/forward_management.cpp
View file @
957f7a63
...
...
@@ -45,19 +45,30 @@ forward_get_fs_config() {
auto
endp
=
CTX
->
hosts
().
at
(
CTX
->
local_host_id
());
gkfs
::
rpc
::
fs_config
::
output
out
;
try
{
LOG
(
DEBUG
,
"Retrieving file system configurations from daemon"
);
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
// can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
out
=
ld_network_service
->
post
<
gkfs
::
rpc
::
fs_config
>
(
endp
).
get
().
at
(
0
);
}
catch
(
const
std
::
exception
&
ex
)
{
LOG
(
ERROR
,
"Retrieving fs configurations from daemon"
);
return
false
;
bool
found
=
false
;
size_t
idx
=
0
;
while
(
!
found
&&
idx
<=
CTX
->
hosts
().
size
())
{
try
{
LOG
(
DEBUG
,
"Retrieving file system configurations from daemon"
);
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that
// we can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
out
=
ld_network_service
->
post
<
gkfs
::
rpc
::
fs_config
>
(
endp
).
get
().
at
(
0
);
found
=
true
;
}
catch
(
const
std
::
exception
&
ex
)
{
LOG
(
ERROR
,
"Retrieving fs configurations from daemon, possible reattempt at peer: {}"
,
idx
);
endp
=
CTX
->
hosts
().
at
(
idx
++
);
}
}
if
(
!
found
)
return
false
;
CTX
->
mountdir
(
out
.
mountdir
());
LOG
(
INFO
,
"Mountdir: '{}'"
,
CTX
->
mountdir
());
...
...
src/client/rpc/forward_metadata.cpp
View file @
957f7a63
...
...
@@ -51,12 +51,14 @@ namespace gkfs::rpc {
* Send an RPC for a create request
* @param path
* @param mode
* @param copy Number of replica to create
* @return error code
*/
int
forward_create
(
const
std
::
string
&
path
,
const
mode_t
mode
)
{
forward_create
(
const
std
::
string
&
path
,
const
mode_t
mode
,
const
int
copy
)
{
auto
endp
=
CTX
->
hosts
().
at
(
CTX
->
distributor
()
->
locate_file_metadata
(
path
));
auto
endp
=
CTX
->
hosts
().
at
(
CTX
->
distributor
()
->
locate_file_metadata
(
path
,
copy
));
try
{
LOG
(
DEBUG
,
"Sending RPC ..."
);
...
...
@@ -81,12 +83,14 @@ forward_create(const std::string& path, const mode_t mode) {
* Send an RPC for a stat request
* @param path
* @param attr
* @param copy metadata replica to read from
* @return error code
*/
int
forward_stat
(
const
std
::
string
&
path
,
string
&
attr
)
{
forward_stat
(
const
std
::
string
&
path
,
string
&
attr
,
const
int
copy
)
{
auto
endp
=
CTX
->
hosts
().
at
(
CTX
->
distributor
()
->
locate_file_metadata
(
path
));
auto
endp
=
CTX
->
hosts
().
at
(
CTX
->
distributor
()
->
locate_file_metadata
(
path
,
copy
));
try
{
LOG
(
DEBUG
,
"Sending RPC ..."
);
...
...
@@ -121,40 +125,44 @@ forward_stat(const std::string& path, string& attr) {
* This function only attempts data removal if data exists (determined when
* metadata is removed)
* @param path
* @param num_copies Replication scenarios with many replicas
* @return error code
*/
int
forward_remove
(
const
std
::
string
&
path
)
{
auto
endp
=
CTX
->
hosts
().
at
(
CTX
->
distributor
()
->
locate_file_metadata
(
path
));
forward_remove
(
const
std
::
string
&
path
,
const
int8_t
num_copies
)
{
int64_t
size
=
0
;
uint32_t
mode
=
0
;
/*
* Send one RPC to metadata destination and remove metadata while retrieving
* size and mode to determine if data needs to removed too
*/
try
{
LOG
(
DEBUG
,
"Sending RPC ..."
);
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
// can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto
out
=
ld_network_service
->
post
<
gkfs
::
rpc
::
remove_metadata
>
(
endp
,
path
)
.
get
()
.
at
(
0
);
LOG
(
DEBUG
,
"Got response success: {}"
,
out
.
err
());
for
(
auto
copy
=
0
;
copy
<
(
num_copies
+
1
);
copy
++
)
{
auto
endp
=
CTX
->
hosts
().
at
(
CTX
->
distributor
()
->
locate_file_metadata
(
path
,
copy
));
if
(
out
.
err
())
return
out
.
err
();
size
=
out
.
size
();
mode
=
out
.
mode
();
}
catch
(
const
std
::
exception
&
ex
)
{
LOG
(
ERROR
,
"while getting rpc output"
);
return
EBUSY
;
/*
* Send one RPC to metadata destination and remove metadata while
* retrieving size and mode to determine if data needs to removed too
*/
try
{
LOG
(
DEBUG
,
"Sending RPC ..."
);
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that
// we can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto
out
=
ld_network_service
->
post
<
gkfs
::
rpc
::
remove_metadata
>
(
endp
,
path
)
.
get
()
.
at
(
0
);
LOG
(
DEBUG
,
"Got response success: {}"
,
out
.
err
());
if
(
out
.
err
())
return
out
.
err
();
size
=
out
.
size
();
mode
=
out
.
mode
();
}
catch
(
const
std
::
exception
&
ex
)
{
LOG
(
ERROR
,
"while getting rpc output"
);
return
EBUSY
;
}
}
// if file is not a regular file and it's size is 0, data does not need to
// be removed, thus, we exit
...
...
@@ -167,44 +175,54 @@ forward_remove(const std::string& path) {
// Small files
if
(
static_cast
<
std
::
size_t
>
(
size
/
gkfs
::
config
::
rpc
::
chunksize
)
<
CTX
->
hosts
().
size
())
{
const
auto
metadata_host_id
=
CTX
->
distributor
()
->
locate_file_metadata
(
path
);
const
auto
endp_metadata
=
CTX
->
hosts
().
at
(
metadata_host_id
);
try
{
LOG
(
DEBUG
,
"Sending RPC to host: {}"
,
endp_metadata
.
to_string
());
gkfs
::
rpc
::
remove_data
::
input
in
(
path
);
handles
.
emplace_back
(
ld_network_service
->
post
<
gkfs
::
rpc
::
remove_data
>
(
endp_metadata
,
in
));
uint64_t
chnk_start
=
0
;
uint64_t
chnk_end
=
size
/
gkfs
::
config
::
rpc
::
chunksize
;
for
(
uint64_t
chnk_id
=
chnk_start
;
chnk_id
<=
chnk_end
;
chnk_id
++
)
{
const
auto
chnk_host_id
=
CTX
->
distributor
()
->
locate_data
(
path
,
chnk_id
);
if
constexpr
(
gkfs
::
config
::
metadata
::
implicit_data_removal
)
{
/*
* If the chnk host matches the metadata host the remove
* request as already been sent as part of the metadata
* remove request.
*/
if
(
chnk_host_id
==
metadata_host_id
)
continue
;
}
const
auto
endp_chnk
=
CTX
->
hosts
().
at
(
chnk_host_id
);
LOG
(
DEBUG
,
"Sending RPC to host: {}"
,
endp_chnk
.
to_string
());
for
(
auto
copymd
=
0
;
copymd
<
(
num_copies
+
1
);
copymd
++
)
{
const
auto
metadata_host_id
=
CTX
->
distributor
()
->
locate_file_metadata
(
path
,
copymd
);
const
auto
endp_metadata
=
CTX
->
hosts
().
at
(
metadata_host_id
);
try
{
LOG
(
DEBUG
,
"Sending RPC to host: {}"
,
endp_metadata
.
to_string
());
gkfs
::
rpc
::
remove_data
::
input
in
(
path
);
handles
.
emplace_back
(
ld_network_service
->
post
<
gkfs
::
rpc
::
remove_data
>
(
endp_chnk
,
in
));
endp_metadata
,
in
));
uint64_t
chnk_start
=
0
;
uint64_t
chnk_end
=
size
/
gkfs
::
config
::
rpc
::
chunksize
;
for
(
uint64_t
chnk_id
=
chnk_start
;
chnk_id
<=
chnk_end
;
chnk_id
++
)
{
for
(
auto
copy
=
0
;
copy
<
(
num_copies
+
1
);
copy
++
)
{
const
auto
chnk_host_id
=
CTX
->
distributor
()
->
locate_data
(
path
,
chnk_id
,
copy
);
if
constexpr
(
gkfs
::
config
::
metadata
::
implicit_data_removal
)
{
/*
* If the chnk host matches the metadata host the
* remove request as already been sent as part of
* the metadata remove request.
*/
if
(
chnk_host_id
==
metadata_host_id
)
continue
;
}
const
auto
endp_chnk
=
CTX
->
hosts
().
at
(
chnk_host_id
);
LOG
(
DEBUG
,
"Sending RPC to host: {}"
,
endp_chnk
.
to_string
());
handles
.
emplace_back
(
ld_network_service
->
post
<
gkfs
::
rpc
::
remove_data
>
(
endp_chnk
,
in
));
}
}
}
catch
(
const
std
::
exception
&
ex
)
{
LOG
(
ERROR
,
"Failed to forward non-blocking rpc request reduced remove requests"
);
return
EBUSY
;
}
}
catch
(
const
std
::
exception
&
ex
)
{
LOG
(
ERROR
,
"Failed to forward non-blocking rpc request reduced remove requests"
);
return
EBUSY
;
}
}
else
{
// "Big" files
for
(
const
auto
&
endp
:
CTX
->
hosts
())
{
...
...
@@ -260,12 +278,14 @@ forward_remove(const std::string& path) {
* during a truncate() call.
* @param path
* @param length
* @param copy Target replica (0 original)
* @return error code
*/
int
forward_decr_size
(
const
std
::
string
&
path
,
size_t
length
)
{
forward_decr_size
(
const
std
::
string
&
path
,
size_t
length
,
const
int
copy
)
{
auto
endp
=
CTX
->
hosts
().
at
(
CTX
->
distributor
()
->
locate_file_metadata
(
path
));
auto
endp
=
CTX
->
hosts
().
at
(
CTX
->
distributor
()
->
locate_file_metadata
(
path
,
copy
));
try
{
LOG
(
DEBUG
,
"Sending RPC ..."
);
...
...
@@ -295,14 +315,17 @@ forward_decr_size(const std::string& path, size_t length) {
* @param path
* @param md
* @param md_flags
* @param copy Target replica (0 original)
* @return error code
*/
int
forward_update_metadentry
(
const
string
&
path
,
const
gkfs
::
metadata
::
Metadata
&
md
,
const
gkfs
::
metadata
::
MetadentryUpdateFlags
&
md_flags
)
{
forward_update_metadentry
(
const
string
&
path
,
const
gkfs
::
metadata
::
Metadata
&
md
,
const
gkfs
::
metadata
::
MetadentryUpdateFlags
&
md_flags
,
const
int
copy
)
{
auto
endp
=
CTX
->
hosts
().
at
(
CTX
->
distributor
()
->
locate_file_metadata
(
path
));
auto
endp
=
CTX
->
hosts
().
at
(
CTX
->
distributor
()
->
locate_file_metadata
(
path
,
copy
));
try
{
LOG
(
DEBUG
,
"Sending RPC ..."
);
...
...
@@ -348,6 +371,7 @@ forward_update_metadentry(
* This marks that this file doesn't have to be accessed directly
* Create a new md with the new name, which should have as value the old name
* All operations should check blockcnt and extract a NOTEXISTS
* The operations does not support replication
* @param oldpath
* @param newpath
* @param md
...
...
@@ -358,8 +382,8 @@ int
forward_rename
(
const
string
&
oldpath
,
const
string
&
newpath
,
const
gkfs
::
metadata
::
Metadata
&
md
)
{
auto
endp
=
CTX
->
hosts
().
at
(
CTX
->
distributor
()
->
locate_file_metadata
(
oldpath
));
auto
endp
=
CTX
->
hosts
().
at
(
CTX
->
distributor
()
->
locate_file_metadata
(
oldpath
,
0
));
try
{
LOG
(
DEBUG
,
"Sending RPC ..."
);
...
...
@@ -405,8 +429,8 @@ forward_rename(const string& oldpath, const string& newpath,
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto
endp2
=
CTX
->
hosts
().
at
(
CTX
->
distributor
()
->
locate_file_metadata
(
newpath
));
auto
endp2
=
CTX
->
hosts
().
at
(
CTX
->
distributor
()
->
locate_file_metadata
(
newpath
,
0
));
try
{
LOG
(
DEBUG
,
"Sending RPC ..."
);
...
...
@@ -479,53 +503,85 @@ forward_rename(const string& oldpath, const string& newpath,
/**
* Send an RPC request for an update to the file size.
* This is called during a write() call or similar
* A single correct call is needed only to progress.
* @param path
* @param size
* @param offset
* @param append_flag
* @param num_copies number of replicas
* @return pair<error code, size after update>
*/
pair
<
int
,
off64_t
>
forward_update_metadentry_size
(
const
string
&
path
,
const
size_t
size
,
const
off64_t
offset
,
const
bool
append_flag
)
{
const
off64_t
offset
,
const
bool
append_flag
,
const
int
num_copies
)
{
auto
endp
=
CTX
->
hosts
().
at
(
CTX
->
distributor
()
->
locate_file_metadata
(
path
));
try
{
LOG
(
DEBUG
,
"Sending RPC ..."
);
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
// can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto
out
=
ld_network_service
->
post
<
gkfs
::
rpc
::
update_metadentry_size
>
(
endp
,
path
,
size
,
offset
,
bool_to_merc_bool
(
append_flag
))
.
get
()
.
at
(
0
);
std
::
vector
<
hermes
::
rpc_handle
<
gkfs
::
rpc
::
update_metadentry_size
>>
handles
;
LOG
(
DEBUG
,
"Got response success: {}"
,
out
.
err
());
for
(
auto
copy
=
0
;
copy
<
num_copies
+
1
;
copy
++
)
{
auto
endp
=
CTX
->
hosts
().
at
(
CTX
->
distributor
()
->
locate_file_metadata
(
path
,
copy
));
try
{
LOG
(
DEBUG
,
"Sending RPC ..."
);
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that
// we can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
handles
.
emplace_back
(
ld_network_service
->
post
<
gkfs
::
rpc
::
update_metadentry_size
>
(
endp
,
path
,
size
,
offset
,
bool_to_merc_bool
(
append_flag
)));
}
catch
(
const
std
::
exception
&
ex
)
{
LOG
(
ERROR
,
"while getting rpc output"
);
return
make_pair
(
EBUSY
,
0
);
}
}
auto
err
=
0
;
ssize_t
out_size
=
0
;
auto
idx
=
0
;
bool
valid
=
false
;
for
(
const
auto
&
h
:
handles
)
{
try
{
// XXX We might need a timeout here to not wait forever for an
// output that never comes?
auto
out
=
h
.
get
().
at
(
0
);
if
(
out
.
err
())
return
make_pair
(
out
.
err
(),
0
);
else
return
make_pair
(
0
,
out
.
ret_size
());
}
catch
(
const
std
::
exception
&
ex
)
{
LOG
(
ERROR
,
"while getting rpc output"
);
return
make_pair
(
EBUSY
,
0
);
if
(
out
.
err
()
!=
0
)
{
LOG
(
ERROR
,
"Daemon {} reported error: {}"
,
idx
,
out
.
err
());
}
else
{
valid
=
true
;
out_size
=
out
.
ret_size
();
}
}
catch
(
const
std
::
exception
&
ex
)
{
LOG
(
ERROR
,
"Failed to get rpc output"
);
if
(
!
valid
)
{
err
=
EIO
;
}
}
idx
++
;
}
if
(
!
valid
)
return
make_pair
(
err
,
0
);
else
return
make_pair
(
0
,
out_size
);
}
/**
* Send an RPC request to get the current file size.
* This is called during a lseek() call
* @param path
* @param copy Target replica (0 original)
* @return pair<error code, file size>
*/
pair
<
int
,
off64_t
>
forward_get_metadentry_size
(
const
std
::
string
&
path
)
{
forward_get_metadentry_size
(
const
std
::
string
&
path
,
const
int
copy
)
{
auto
endp
=
CTX
->
hosts
().
at
(
CTX
->
distributor
()
->
locate_file_metadata
(
path
));
auto
endp
=
CTX
->
hosts
().
at
(
CTX
->
distributor
()
->
locate_file_metadata
(
path
,
copy
));
try
{
LOG
(
DEBUG
,
"Sending RPC ..."
);
...
...
@@ -831,7 +887,8 @@ forward_get_dirents_single(const string& path, int server) {
int
forward_mk_symlink
(
const
std
::
string
&
path
,
const
std
::
string
&
target_path
)
{
auto
endp
=
CTX
->
hosts
().
at
(
CTX
->
distributor
()
->
locate_file_metadata
(
path
));
auto
endp
=
CTX
->
hosts
().
at
(
CTX
->
distributor
()
->
locate_file_metadata
(
path
,
0
));
try
{
LOG
(
DEBUG
,
"Sending RPC ..."
);
...
...
src/common/rpc/distributor.cpp
View file @
957f7a63
...
...
@@ -47,27 +47,34 @@ SimpleHashDistributor::localhost() const {
return
localhost_
;
}
unsigned
int
SimpleHashDistributor
::
hosts_size
()
const
{
return
hosts_size_
;
}
host_t
SimpleHashDistributor
::
locate_data
(
const
string
&
path
,
const
chunkid_t
&
chnk_id
)
const
{
return
str_hash
(
path
+
::
to_string
(
chnk_id
))
%
hosts_size_
;
SimpleHashDistributor
::
locate_data
(
const
string
&
path
,
const
chunkid_t
&
chnk_id
,
const
int
num_copy
)
const
{
return
(
str_hash
(
path
+
::
to_string
(
chnk_id
))
+
num_copy
)
%
hosts_size_
;
}
host_t
SimpleHashDistributor
::
locate_data
(
const
string
&
path
,
const
chunkid_t
&
chnk_id
,
unsigned
int
hosts_size
)
{
unsigned
int
hosts_size
,
const
int
num_copy
)
{
if
(
hosts_size_
!=
hosts_size
)
{
hosts_size_
=
hosts_size
;
all_hosts_
=
std
::
vector
<
unsigned
int
>
(
hosts_size
);
::
iota
(
all_hosts_
.
begin
(),
all_hosts_
.
end
(),
0
);
}
return
str_hash
(
path
+
::
to_string
(
chnk_id
))
%
hosts_size_
;
return
(
str_hash
(
path
+
::
to_string
(
chnk_id
))
+
num_copy
)
%
hosts_size_
;
}
host_t
SimpleHashDistributor
::
locate_file_metadata
(
const
string
&
path
)
const
{
return
str_hash
(
path
)
%
hosts_size_
;
SimpleHashDistributor
::
locate_file_metadata
(
const
string
&
path
,
const
int
num_copy
)
const
{
return
(
str_hash
(
path
)
+
num_copy
)
%
hosts_size_
;
}
::
vector
<
host_t
>
...
...
@@ -83,14 +90,20 @@ LocalOnlyDistributor::localhost() const {
return
localhost_
;
}
unsigned
int
LocalOnlyDistributor
::
hosts_size
()
const
{
return
hosts_size_
;
}
host_t
LocalOnlyDistributor
::
locate_data
(
const
string
&
path
,
const
chunkid_t
&
chnk_id
)
const
{
LocalOnlyDistributor
::
locate_data
(
const
string
&
path
,
const
chunkid_t
&
chnk_id
,
const
int
num_copy
)
const
{
return
localhost_
;
}
host_t
LocalOnlyDistributor
::
locate_file_metadata
(
const
string
&
path
)
const
{
LocalOnlyDistributor
::
locate_file_metadata
(
const
string
&
path
,
const
int
num_copy
)
const
{
return
localhost_
;
}
...
...
@@ -110,24 +123,32 @@ ForwarderDistributor::localhost() const {
return
fwd_host_
;
}
unsigned
int
ForwarderDistributor
::
hosts_size
()
const
{
return
hosts_size_
;
}
host_t
ForwarderDistributor
::
locate_data
(
const
std
::
string
&
path
,
const
chunkid_t
&
chnk_id
)
const
{
const
chunkid_t
&
chnk_id
,
const
int
num_copy
)
const
{
return
fwd_host_
;
}
host_t
ForwarderDistributor
::
locate_data
(
const
std
::
string
&
path
,
const
chunkid_t
&
chnk_id
,
unsigned
int
host_size
)
{
unsigned
int
host_size
,
const
int
num_copy
)
{
return
fwd_host_
;
}
host_t
ForwarderDistributor
::
locate_file_metadata
(
const
std
::
string
&
path
)
const
{
return
str_hash
(
path
)
%
hosts_size_
;
ForwarderDistributor
::
locate_file_metadata
(
const
std
::
string
&
path
,
const
int
num_copy
)
const
{
return
(
str_hash
(
path
)
+
num_copy
)
%
hosts_size_
;
}
std
::
vector
<
host_t
>
ForwarderDistributor
::
locate_directory_metadata
(
const
std
::
string
&
path
)
const
{
return
all_hosts_
;
...
...
@@ -213,21 +234,26 @@ GuidedDistributor::localhost() const {
return
localhost_
;
}
unsigned
int
GuidedDistributor
::
hosts_size
()
const
{
return
hosts_size_
;
}
host_t
GuidedDistributor
::
locate_data
(
const
string
&
path
,
const
chunkid_t
&
chnk_id
,
unsigned
int
hosts_size
)
{
unsigned
int
hosts_size
,
const
int
num_copy
)
{
if
(
hosts_size_
!=
hosts_size
)
{
hosts_size_
=
hosts_size
;
all_hosts_
=
std
::
vector
<
unsigned
int
>
(
hosts_size
);
::
iota
(
all_hosts_
.
begin
(),
all_hosts_
.
end
(),
0
);
}
return
(
locate_data
(
path
,
chnk_id
));
return
(
locate_data
(
path
,
chnk_id
,
num_copy
));
}
host_t
GuidedDistributor
::
locate_data
(
const
string
&
path
,
const
chunkid_t
&
chnk_id
)
const
{
GuidedDistributor
::
locate_data
(
const
string
&
path
,
const
chunkid_t
&
chnk_id
,
const
int
num_copy
)
const
{
auto
it
=
map_interval
.
find
(
path
);
if
(
it
!=
map_interval
.
end
())
{
auto
it_f
=
it
->
second
.
first
.
IsInsideInterval
(
chnk_id
);
...
...
@@ -245,14 +271,16 @@ GuidedDistributor::locate_data(const string& path,
}
auto
locate
=
path
+
::
to_string
(
chnk_id
);
return
str_hash
(
locate
)
%
hosts_size_
;
return
(
str_hash
(
locate
)
+
num_copy
)
%
hosts_size_
;
}
host_t
GuidedDistributor
::
locate_file_metadata
(
const
string
&
path
)
const
{
return
str_hash
(
path
)
%
hosts_size_
;
GuidedDistributor
::
locate_file_metadata
(
const
string
&
path
,
const
int
num_copy
)
const
{
return
(
str_hash
(
path
)
+
num_copy
)
%
hosts_size_
;
}
::
vector
<
host_t
>
GuidedDistributor
::
locate_directory_metadata
(
const
string
&
path
)
const
{
return
all_hosts_
;
...
...
src/common/rpc/rpc_util.cpp
View file @
957f7a63
...
...
@@ -36,6 +36,7 @@ extern "C" {
#include
<system_error>
using
namespace
std
;
namespace
gkfs
::
rpc
{
...
...
@@ -104,4 +105,111 @@ get_host_by_name(const string& hostname) {
}
#endif
/**
* @brief Get the bit from a bit vector
*
* @param data
* @param position
* @return the bit
*/
bool
get_bitset
(
const
std
::
vector
<
uint8_t
>&
data
,
const
uint16_t
position
)
{
return
(
data
[(
position
)
/
8
]
&
1
<<
((
position
)
%
8
));
}
/**
* @brief Get the bit from a bit vector
*
* @param data
* @param position
*/
void
set_bitset
(
std
::
vector
<
uint8_t
>&
data
,
const
uint16_t
position
)
{
data
[(
position
)
/
8
]
|=
1
<<
((
position
)
%
8
);
// set
}
std
::
string
base64_encode
(
const
std
::
vector
<
uint8_t
>&
data
)
{
static
const
std
::
string
base64_chars
=
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"
;
std
::
ostringstream
encoded
;
uint32_t
buffer
=
0
;
int
padding
=
0
;
for
(
uint8_t
byte
:
data
)
{
buffer
=
(
buffer
<<
8
)
|
byte
;
padding
+=
8
;
while
(
padding
>=
6
)
{
padding
-=
6
;
encoded
<<
base64_chars
[(
buffer
>>
padding
)
&
0x3F
];
}
}
if
(
padding
>
0
)
{
buffer
<<=
6
-
padding
;
encoded
<<
base64_chars
[
buffer
&
0x3F
];
}
while
(
encoded
.
str
().
length
()
%
4
!=
0
)
{
encoded
<<
'='
;
}
return
encoded
.
str
();
}
std
::
vector
<
uint8_t
>
base64_decode
(
const
std
::
string
&
encoded
)
{
static
const
std
::
string
base64_chars
=
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"
;
std
::
vector
<
uint8_t
>
data
;
uint32_t
buffer
=
0
;
int
padding
=
0
;
size_t
count
=
0
;
for
(
char
c
:
encoded
)
{
if
(
c
==
'='
)
break
;
std
::
size_t
value
=
base64_chars
.
find
(
c
);
if
(
value
==
std
::
string
::
npos
)
continue
;
buffer
=
(
buffer
<<
6
)
|
value
;
padding
+=
6
;
if
(
padding
>=
8
)
{
padding
-=
8
;
data
.
push_back
(
static_cast
<
uint8_t
>
((
buffer
>>
padding
)
&
0xFF
));
count
++
;
}
}
// Handle padding characters
if
(
padding
>
0
&&
padding
<
6
&&
(
buffer
&
((
1
<<
padding
)
-
1
))
==
0
)
{
// Remove the padding bits
buffer
>>=
padding
;
padding
=
0
;
data
.
push_back
(
static_cast
<
uint8_t
>
((
buffer
>>
8
)
&
0xFF
));
count
++
;
}
if
(
count
==
0
||
padding
%
8
!=
0
)
return
{};
return
data
;
}
std
::
string
compress_bitset
(
const
std
::
vector
<
uint8_t
>&
bytes
)
{
return
base64_encode
(
bytes
);
}
std
::
vector
<
uint8_t
>
decompress_bitset
(
const
std
::
string
&
compressedString
)
{
return
base64_decode
(
compressedString
);
}
}
// namespace gkfs::rpc
\ No newline at end of file
src/daemon/handler/srv_data.cpp
View file @
957f7a63
...
...
@@ -40,6 +40,7 @@
#include
<daemon/ops/data.hpp>
#include
<common/rpc/rpc_types.hpp>
#include
<common/rpc/rpc_util.hpp>
#include
<common/rpc/distributor.hpp>
#include
<common/arithmetic/arithmetic.hpp>
#include
<common/statistics/stats.hpp>
...
...
@@ -51,9 +52,9 @@
#define AGIOS_WRITE 1
#define AGIOS_SERVER_ID_IGNORE 0
#endif
using
namespace
std
;
namespace
{
/**
...
...
@@ -115,6 +116,8 @@ rpc_srv_write(hg_handle_t handle) {
__func__
,
in
.
path
,
in
.
chunk_start
,
in
.
chunk_end
,
in
.
chunk_n
,
in
.
total_chunk_size
,
bulk_size
,
in
.
offset
);
std
::
vector
<
uint8_t
>
write_ops_vect
=
gkfs
::
rpc
::
decompress_bitset
(
in
.
wbitset
);
#ifdef GKFS_ENABLE_AGIOS
int
*
data
;
...
...
@@ -228,9 +231,9 @@ rpc_srv_write(hg_handle_t handle) {
chnk_id_file
<=
in
.
chunk_end
&&
chnk_id_curr
<
in
.
chunk_n
;
chnk_id_file
++
)
{
// Continue if chunk does not hash to this host
#ifndef GKFS_ENABLE_FORWARDING
if
(
RPC_DATA
->
distributor
()
->
locate_data
(
in
.
path
,
chnk_id_file
,
host_size
)
!=
host_id
)
{
if
(
!
(
gkfs
::
rpc
::
get_bitset
(
write_ops_vect
,
chnk_id_file
-
in
.
chunk_start
))
)
{
GKFS_DATA
->
spdlogger
()
->
trace
(
"{}() chunkid '{}' ignored as it does not match to this host with id '{}'. chnk_id_curr '{}'"
,
__func__
,
chnk_id_file
,
host_id
,
chnk_id_curr
);
...
...
@@ -240,7 +243,6 @@ rpc_srv_write(hg_handle_t handle) {
if
(
GKFS_DATA
->
enable_chunkstats
())
{
GKFS_DATA
->
stats
()
->
add_write
(
in
.
path
,
chnk_id_file
);
}
#endif
chnk_ids_host
[
chnk_id_curr
]
=
chnk_id_file
;
// save this id to host chunk list
...
...
@@ -417,7 +419,8 @@ rpc_srv_read(hg_handle_t handle) {
"{}() path: '{}' chunk_start '{}' chunk_end '{}' chunk_n '{}' total_chunk_size '{}' bulk_size: '{}' offset: '{}'"
,
__func__
,
in
.
path
,
in
.
chunk_start
,
in
.
chunk_end
,
in
.
chunk_n
,
in
.
total_chunk_size
,
bulk_size
,
in
.
offset
);
std
::
vector
<
uint8_t
>
read_bitset_vect
=
gkfs
::
rpc
::
decompress_bitset
(
in
.
wbitset
);
#ifdef GKFS_ENABLE_AGIOS
int
*
data
;
ABT_eventual
eventual
=
ABT_EVENTUAL_NULL
;
...
...
@@ -485,10 +488,9 @@ rpc_srv_read(hg_handle_t handle) {
__func__
);
return
gkfs
::
rpc
::
cleanup_respond
(
&
handle
,
&
in
,
&
out
,
&
bulk_handle
);
}
#ifndef GKFS_ENABLE_FORWARDING
auto
const
host_id
=
in
.
host_id
;
auto
const
host_size
=
in
.
host_size
;
#endif
auto
path
=
make_shared
<
string
>
(
in
.
path
);
// chnk_ids used by this host
vector
<
uint64_t
>
chnk_ids_host
(
in
.
chunk_n
);
...
...
@@ -519,9 +521,11 @@ rpc_srv_read(hg_handle_t handle) {
chnk_id_file
<=
in
.
chunk_end
&&
chnk_id_curr
<
in
.
chunk_n
;
chnk_id_file
++
)
{
// Continue if chunk does not hash to this host
#ifndef GKFS_ENABLE_FORWARDING
if
(
RPC_DATA
->
distributor
()
->
locate_data
(
in
.
path
,
chnk_id_file
,
host_size
)
!=
host_id
)
{
// We only check if we are not using replicas
if
(
!
(
gkfs
::
rpc
::
get_bitset
(
read_bitset_vect
,
chnk_id_file
-
in
.
chunk_start
)))
{
GKFS_DATA
->
spdlogger
()
->
trace
(
"{}() chunkid '{}' ignored as it does not match to this host with id '{}'. chnk_id_curr '{}'"
,
__func__
,
chnk_id_file
,
host_id
,
chnk_id_curr
);
...
...
@@ -530,7 +534,7 @@ rpc_srv_read(hg_handle_t handle) {
if
(
GKFS_DATA
->
enable_chunkstats
())
{
GKFS_DATA
->
stats
()
->
add_read
(
in
.
path
,
chnk_id_file
);
}
#endif
chnk_ids_host
[
chnk_id_curr
]
=
chnk_id_file
;
// save this id to host chunk list
...
...
@@ -597,6 +601,10 @@ rpc_srv_read(hg_handle_t handle) {
GKFS_DATA
->
spdlogger
()
->
warn
(
"{}() Not all chunks were detected!!! Size left {}"
,
__func__
,
chnk_size_left_host
);
if
(
chnk_size_left_host
==
in
.
total_chunk_size
)
return
HG_CANCELED
;
/*
* 4. Read task results and accumulate in out.io_size
*/
...
...
tests/unit/test_guided_distributor.cpp
View file @
957f7a63
...
...
@@ -39,23 +39,23 @@ TEST_CASE( "Guided distributor Testing", "[Distributor]" ) {
// The distributor should return 3 for all the tested files
auto
d
=
gkfs
::
rpc
::
GuidedDistributor
();
REQUIRE
(
d
.
locate_data
(
"/t.c01"
,
1
,
10
)
==
3
);
REQUIRE
(
d
.
locate_data
(
"/t.c02"
,
1
,
10
)
==
3
);
REQUIRE
(
d
.
locate_data
(
"/t.c03"
,
1
,
10
)
==
3
);
REQUIRE
(
d
.
locate_data
(
"/t.c04"
,
1
,
10
)
==
3
);
REQUIRE
(
d
.
locate_data
(
"/t.c05"
,
1
,
10
)
==
3
);
REQUIRE
(
d
.
locate_data
(
"/t.c06"
,
1
,
10
)
==
3
);
REQUIRE
(
d
.
locate_data
(
"/t.c07"
,
1
,
10
)
==
3
);
REQUIRE
(
d
.
locate_data
(
"/t.c01"
,
1
,
10
,
0
)
==
3
);
REQUIRE
(
d
.
locate_data
(
"/t.c02"
,
1
,
10
,
0
)
==
3
);
REQUIRE
(
d
.
locate_data
(
"/t.c03"
,
1
,
10
,
0
)
==
3
);
REQUIRE
(
d
.
locate_data
(
"/t.c04"
,
1
,
10
,
0
)
==
3
);
REQUIRE
(
d
.
locate_data
(
"/t.c05"
,
1
,
10
,
0
)
==
3
);
REQUIRE
(
d
.
locate_data
(
"/t.c06"
,
1
,
10
,
0
)
==
3
);
REQUIRE
(
d
.
locate_data
(
"/t.c07"
,
1
,
10
,
0
)
==
3
);
// Next result is random, but with the same seed is consistent
// We ask for chunk 5 that is distributed randomly between the
// 10 servers.
REQUIRE
(
(
d
.
locate_data
(
"/t.c01"
,
5
,
10
)
+
d
.
locate_data
(
"/t.c02"
,
5
,
10
)
+
d
.
locate_data
(
"/t.c03"
,
5
,
10
)
+
d
.
locate_data
(
"/t.c04"
,
5
,
10
)
+
d
.
locate_data
(
"/t.c05"
,
5
,
10
)
+
d
.
locate_data
(
"/t.c06"
,
5
,
10
)
+
d
.
locate_data
(
"/t.c07"
,
5
,
10
)
)
==
42
);
REQUIRE
(
(
d
.
locate_data
(
"/t.c01"
,
5
,
10
,
0
)
+
d
.
locate_data
(
"/t.c02"
,
5
,
10
,
0
)
+
d
.
locate_data
(
"/t.c03"
,
5
,
10
,
0
)
+
d
.
locate_data
(
"/t.c04"
,
5
,
10
,
0
)
+
d
.
locate_data
(
"/t.c05"
,
5
,
10
,
0
)
+
d
.
locate_data
(
"/t.c06"
,
5
,
10
,
0
)
+
d
.
locate_data
(
"/t.c07"
,
5
,
10
,
0
)
)
==
42
);
}
}
Prev
1
2
Next